package com.codejiwei.flink.sink;

import com.codejiwei.flink.entity.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * @author jiwei
 * @description
 * @date 2023/5/22 17:22
 */
public class MyJDBCSink extends RichSinkFunction<WaterSensor> {

    private Connection conn;
    private PreparedStatement ps;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false", "root", "root");
        ps = conn.prepareStatement("insert into sensor values(?,?,?)");
    }

    @Override
    public void invoke(WaterSensor value, Context context) throws Exception {
        ps.setString(1, value.getId());
        ps.setLong(2, value.getTs());
        ps.setInt(3, value.getVc());
        ps.execute();
    }

    @Override
    public void close() throws Exception {
        ps.close();
        conn.close();
    }
}
